# This script reads the pickle files in the Google Drive and
# outputs Hive-partitioned Parquet. This is preferable because
# we don't have to load all columns into memory, we can use 
# predicate pushdown to filter on date ranges without loading
# lots of data, and we can read the data into Python or R using
# Arrow, instead of using Python-specific binary files.

from pathlib import Path
import json
import logging
import pickle
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm
from schemas import NEWSWHIP_SCHEMA


def main():
    # this directory structure should mirror Google Drive
    RAW_DATA_PATH = Path("data/raw/iddp_investigation")
    assert RAW_DATA_PATH.exists()
    PROCESSED_DATA_PATH = Path("data/processed/newswhip")

    # exclude newswire reports, which have a different schema
    target_files = list(set(RAW_DATA_PATH.rglob("*.pkl")) - set(RAW_DATA_PATH.glob("Newswire/*/*.pkl")))

    for fname in tqdm(target_files):
        logging.info(fname)
        with open(fname, "rb") as p:
            df = pickle.load(p)
        if df.columns.empty or df.empty:
            logging.debug(f"{fname.stem} has no rows")
            continue

        # flatten nested data into a rectangular data frame
        json_struct = json.loads(df.to_json(orient="records", default_handler=str)) 
        df = pd.json_normalize(json_struct)

        df['creation_date'] = pd.to_datetime(df['publication_timestamp'], unit='ms')
        df['year'] = df['creation_date'].dt.year
        df['month'] = df['creation_date'].dt.month
        df['authors'] = df['authors'].apply(lambda x: ", ".join(x) if x is not None else pd.NA)

        for col in NEWSWHIP_SCHEMA.names:
            if col not in df.columns:
                df[col] = pd.NA

        try:
            df = pa.Table.from_pandas(df, preserve_index = False, schema=NEWSWHIP_SCHEMA)
        except (KeyError, pa.lib.ArrowTypeError):
            # handle broken article text
            # this should be fixed by changing the default_handler, but
            # don't want to remove yet
            if df['article'].map(lambda x: isinstance(x, int)).all():
                logging.error(f"{fname.stem} has bad article data")
                continue

            raise
        except OverflowError:
            # handle exceptions in `article`.
            # this should be fixed by changing the default_handler, but
            # don't want to remove yet
            logging.debug(f"handling overflow error in {fname.stem}")
            df['article'] = df['article'].apply(str)
            df = pa.Table.from_pandas(df, preserve_index = False, schema=NEWSWHIP_SCHEMA)
        
        pq.write_to_dataset(df,
                            root_path=PROCESSED_DATA_PATH,
                            partition_cols=['source.domain', 'year', 'month'],
                            compression='snappy')


if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s %(levelname)-8s %(message)s',
        level=logging.DEBUG,
        datefmt='%Y-%m-%d %H:%M:%S',
        handlers=[
            logging.FileHandler("debug.log"),
            # monitoring progress using tqdm
            # uncomment the line below for more detail
            # logging.StreamHandler(),
        ],
        )

    main()